-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix possible deadlock in AWS pubsub #804
Conversation
All committers have signed the CLA. |
68c47b5
to
2594652
Compare
var lastFetch atomic.Pointer[time.Time] | ||
var epoch time.Time | ||
lastFetch.Store(&epoch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a race on this, so I've moved it into an atomic
case <-fetchCtx.Done(): | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was the deadlock;
If all the workers panic'ed and quit, that could have resulted in the fetch processor trying to write onto the workChan
but nothing pulling and reading them.
I've solved this by adding this new select, so if the ctx is done, we don't even try to push to the channel & I've also added additional panic recovery wrappers at other points in the code.
// We should only long poll for 20 seconds, so if this takes more than | ||
// 30 seconds we should cancel the context and try again | ||
// | ||
// We do this incase the ReceiveMessage call gets stuck on the server | ||
// and doesn't return | ||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second) | ||
defer cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One theory I have is the AWS library might be stalling and blocking under high load, so I've introduced a smaller timeout to try and cause a context cancelled error
|
||
// Check if the context has been cancelled, and if so, return the error | ||
if ctx.Err() != nil { | ||
return ctx.Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? This might hide other errors from the func if they error for some other reason
// If there was an error processing the message, apply the backoff policy | ||
_, delay := utils.GetDelay(retryPolicy.MaxRetries, retryPolicy.MinBackoff, retryPolicy.MaxBackoff, uint16(deliveryAttempt)) | ||
_, visibilityChangeErr := t.sqsClient.ChangeMessageVisibility(t.ctx, &sqs.ChangeMessageVisibilityInput{ | ||
_, visibilityChangeErr := t.sqsClient.ChangeMessageVisibility(ctx, &sqs.ChangeMessageVisibilityInput{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we do this with a context not derived from the input, so we do this even if the input context is canceled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I delribatley wanted both these API calls to release when the fetch context is cancelled, as we immediately go into a loop to go again.
I think them being based on the t.ctx
, rather than the fetch context could have been an issue; as we never cancel the t.ctx
, but the fetchCtx
is cancelled when we want to exit the WorkConcurrently
code
_, err = t.sqsClient.DeleteMessage(t.ctx, &sqs.DeleteMessageInput{ | ||
} else { | ||
// If the message was processed successfully, delete it from the queue | ||
_, err = t.sqsClient.DeleteMessage(ctx, &sqs.DeleteMessageInput{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
fetchWithPanicHandling := func(ctx context.Context, maxToFetch int) (work []Work, err error) { | ||
defer func() { | ||
if r := recover(); r != nil { | ||
err = errs.B().Msgf("panic: %v", r).Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we include the stack like we do elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errs.B()
will build in a stack no?
No description provided.